Cloud Composer で Dataplex のデータリネージ統合を試してみた 〜カスタム リネージ イベント編〜

Cloud Composer で Dataplex のデータリネージ統合を試してみた 〜カスタム リネージ イベント編〜

Clock Icon2024.09.23

こんにちは!エノカワです。

Cloud Composer は、Apache Airflow で構築されたフルマネージドのワークフローオーケストレーションサービスです。
Apache Airflow のオープンソース プロジェクトを基に構築されており、Python プログラミング言語を使用して動作します。

前回試したCloud Composer と Dataplex を使ったデータリネージの統合に引き続き、カスタム リネージ イベントを試してみました。

試したこと

Cloud Composer の標準オペレーターでは、自動でリネージが記録されますが、カスタム リネージ イベントを使うことで、標準オペレーター以外の操作(例:CLI コマンドや API 呼び出し)でもデータフローを追跡できます。

前回試した環境をベースにしていますので、詳細は下記のエントリもご参照ください。
※以降、下記エントリの内容は 前回 という文脈で引用します。

https://dev.classmethod.jp/articles/cloud-composer-dataplex/

環境作成

今回は、前回と同じ設定で環境を構築しています。
具体的には、test-composer という名前の環境を東京リージョンで作成しています。
また、前回と同様に Dataplex データリネージ統合 も有効化されています。

cloud-composer-dataplex-custom-lineage-event_01

DAG を作成する

前回は自動リネージ レポートでサポートされているオペレーターGCSToBigQueryOperatorを使用したので、自動でリネージが記録されました。

自動リネージ レポートでサポートされていないオペレーターのリネージを報告する場合は、カスタム リネージ イベントを送信します。

具体的には、下記ドキュメントに記載のとおり、BashOperatorまたはPythonOperatorのパラメータを変更します。

  • BashOperator、タスク定義の inlets パラメータまたは outlets パラメータを変更します。
  • PythonOperator。タスク定義の task.inlets パラメータまたは task.outlets パラメータを変更します。inlets パラメータに AUTO を使用すると、値はアップストリーム タスクの outlets と同じに設定されます。

inletsパラメータに入力エンティティ、outletsパラメータに出力エンティティを定義するイメージです。

今回は、BashOperator を使用して GCS バケット間でファイルをコピーするタスクを追加しました。
このタスクには、カスタム リネージ イベントを設定しており、データフロー全体が Dataplex で追跡できるようにしています。

以下は、前回作成した DAG を拡張し、GCS から BigQuery へのデータ転送に加えて、GCS バケット間のファイルコピー操作も追加した例です。

gcs_to_bigquery_custom_lineage_dag.py
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.composer.data_lineage.entities import GCSEntity

PROJECT_ID = '{プロジェクトID}'
DATASET_ID = 'work'
TABLE_ID = 'sales_data'
BUCKET_NAME = 'cm_enokawa_work'
SOURCE_BUCKET_NAME = 'cm_enokawa_work_source'
FILE_NAME = 'sales.csv'

# DAGの基本設定
default_args = {
    'start_date': days_ago(1),
}

with DAG(
    dag_id='gcs_to_bigquery_custom_lineage_dag',
    default_args=default_args,
    schedule_interval=None,  # 手動で実行する
    catchup=False
) as dag:

    # BashOperator を使って GCS上のファイルをコピー
    copy_gcs_file_task = BashOperator(
        task_id='copy_gcs_file',
        bash_command=f'gcloud storage cp gs://{SOURCE_BUCKET_NAME}/{FILE_NAME} gs://{BUCKET_NAME}/{FILE_NAME}',
        inlets=[GCSEntity(
            bucket=f'{SOURCE_BUCKET_NAME}',  # 入力元のGCSバケット名
            path=f'{FILE_NAME}'
        )],
        outlets=[GCSEntity(
            bucket=f'{BUCKET_NAME}',         # 出力先のGCSバケット名
            path=f'{FILE_NAME}'
        )]
    )

    # GCS から BigQuery へのデータ取り込み
    gcs_to_bq = GCSToBigQueryOperator(
        task_id='load_gcs_to_bq',
        bucket=BUCKET_NAME,  # GCS バケット名
        source_objects=[f'{FILE_NAME}'],  # GCS 内のファイルパス
        destination_project_dataset_table=f'{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}',  # BigQuery のデータセットとテーブル
        source_format='CSV',  # ソースファイル形式
        write_disposition='WRITE_TRUNCATE',  # 既存データの上書き
        skip_leading_rows=1  # CSV のヘッダー行をスキップ
    )

    copy_gcs_file_task >> gcs_to_bq

この DAG では、GCS バケット間のファイルコピー操作と BigQuery へのデータ取り込みを行い、カスタム リネージ イベントを使ってそのデータフローを追跡しています。

DAG を実行する

Cloud Composer の DAG 実行画面から先ほど作成した DAG を手動実行します。

cloud-composer-dataplex-custom-lineage-event_02

DAG の実行が完了し、タスクが成功したことを確認しました。
copy_gcs_file_taskタスクにより、カスタム リネージ イベントが送信され、データリネージが記録されているはずです。

cloud-composer-dataplex-custom-lineage-event_03

Dataplex の UI 上でリネージグラフを確認してみましょう。

リネージグラフ を確認する

Dataplex の UI からデータ取り込み先である BigQuery テーブルsales_dataを検索します。

cloud-composer-dataplex-custom-lineage-event_04

sales_dataを選択して 「リネージ」 を表示します。
リネージグラフは以下のように表示され、GCS(sales.csv) から BigQuery(sales_data) へのデータの流れを視覚的に確認できます。

前回試したときと同様のフローですが、GCS から BigQuery へのフロー上にある Cloud Composer のアイコンが増えています。
アイコンをクリックすると、先ほど実行したDAGgcs_to_bigquery_custom_lineage_dagのフローだということが分かります。

cloud-composer-dataplex-custom-lineage-event_05

今回追加した GCSバケット間のコピー のフローが見当たりませんね。
GCS(sales.csv)のアイコンの 「+」マーク をクリックします。

cloud-composer-dataplex-custom-lineage-event_06

すると、GCS(sales.csv)の上流にフローが展開されました。
ファイル名がsales.csvと同じなので分かりづらいですが、今回追加した GCSバケット間のコピー のフローです。

cloud-composer-dataplex-custom-lineage-event_07

GCSアイコン間のフロー上にある Cloud Composer のアイコンをクリックすると、BashOperatorで定義したタスクcopy_gcs_fileであることが分かります。

cloud-composer-dataplex-custom-lineage-event_08

カスタム リネージ イベントの送信により、リネージが記録されていることが確認できました!

まとめ

以上、Cloud Composer で Dataplex のカスタム リネージ イベントを試してみました。

カスタム リネージ イベントを使用することで、標準オペレーターではサポートされていない操作に対しても、データフローを追跡できることが確認できました。

今回は GCS バケット間のファイルコピーを例にエンティティにGCSEntityを使用しましたが、BigQuery の場合はBigQueryTableにより定義できます。

from airflow.composer.data_lineage.entities import BigQueryTable

…

bash_task = BashOperator(
   task_id='bash_task',
   dag=dag,
   bash_command='sleep 0',
   inlets=[BigQueryTable(
       project_id=GCP_PROJECT,
       dataset_id='dataset',
       table_id='table1',
   )],
   outlets=[BigQueryTable(
       project_id=GCP_PROJECT,
       dataset_id='dataset',
       table_id='table2',
   )]
)

また、Airflow 内でカスタム リネージ イベントを作成するために使用できる汎用的なエンティティとしてDataLineageEntityがあります。
GCSEntityBigQueryTableといった特定のリソース(GCS ファイルや BigQuery テーブル)を扱うエンティティに対して、DataLineageEntityは、任意のリソースやプロセスのデータリネージを追跡したい場合に使用されます。
たとえば、GCS や BigQuery 以外のサービス(API 呼び出しによるデータのやり取りなど)でのリネージを記録したい場合に便利です。

機会があれば、他のユースケースについても試してみたいと思います。

参考

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.